

#include "msghandler.h"
#include "package.h"
#include "msgexpress.h"
#include <time.h>

int MsgHandler :: mMsgSeq = 0;

MsgHandler :: MsgHandler( ServiceManager * manager )
{
	memset( &mSid, 0, sizeof( mSid ) );
	mServiceManager = manager;
}

MsgHandler :: ~MsgHandler()
{
	
}
void MsgHandler :: post( SP_Response * response, const char * buffer,size_t size, SP_SidList* toUsers)
{
	if( toUsers->getCount() > 0 ) {
		 #ifdef _WIN32
     SP_Message * msg = new SP_Message(0,toUsers);
     #elif __linux__
     SP_Message * msg = new SP_Message();
     #endif
		msg->setCompletionKey( ++mMsgSeq );
		msg->getMsg()->append( buffer ,size);
		response->addMessage( msg );
	}
}
void MsgHandler :: post( SP_Response * response, const char * buffer,size_t size, SP_Sid_t toSid)
{
	SP_Message * msg = new SP_Message();
	msg->setCompletionKey( ++mMsgSeq );
	msg->getMsg()->append( buffer ,size);
	msg->getToList()->add(toSid);
	response->addMessage( msg );
}
void MsgHandler :: broadcast( SP_Response * response, const char * buffer,size_t size, SP_Sid_t * ignoreSid )
{
	
}
void MsgHandler :: broadcast( SP_Response * response, const string& msg, SP_Sid_t * ignoreSid )
{
	broadcast(response,msg.c_str(),msg.size(),ignoreSid);
}
int MsgHandler :: start( SP_Request * request, SP_Response * response )
{
	request->setMsgDecoder( new MsgDecoder() );
	mSid = response->getFromSid();

	mServiceManager->getUserList()->add( mSid );
	return 0;
}
bool messageTypeMatch(const std::string& type1,const std::string& type2)
{
	return 0==strcmp(type1.c_str(),type2.c_str());
}
void MsgHandler :: regService(Package* package, SP_Response * response)
{
	MsgExpress::RegService* service=(MsgExpress::RegService*)package->getMessage().get();
	if(service==NULL)
		return;
	int total=service->functionid_size();
	vector<int> vecFunctions;
	for(int i=0;i<total;i++)
	{
		int functionid=service->functionid(i);
		if(mServiceList.count(functionid)==0)
		{
			mServiceList.insert(functionid);
			vecFunctions.push_back(functionid);
		}
	}
	mServiceManager->regService(vecFunctions,mSid);

	MsgExpress::Response resp;
	resp.set_retcode(0);
	resp.set_msg("Reg service OK!");
	std::string msg;
	MessageUtil::serializePackageToString(resp,PackageHeader::Response,package->getCommand(),package->getSerialNum(),0,&msg);
	response->getReply()->getMsg()->append(msg.c_str(),msg.size());
	response->getReply()->setCompletionKey( ++mMsgSeq );
	printf("Reg service:%s\r\n",service->ShortDebugString().c_str());
}
void MsgHandler :: subscribe(Package* package, SP_Response * response)
{
	MsgExpress::SubscribeData* sub=(MsgExpress::SubscribeData*)package->getMessage().get();
	if(sub==NULL)
		return;
	printf("Subscribe:%s\r\n",sub->ShortDebugString().c_str());
	int total=sub->condition_size();
	int subId=sub->id();
	mServiceManager->subscribe(*sub,mSid.value);

	MsgExpress::SubscribeResponse resp;
	resp.set_result(0);
	resp.set_id(sub->id());
	std::string msg;
	MessageUtil::serializePackageToString(resp,PackageHeader::Response,package->getCommand(),package->getSerialNum(),0,&msg);
	response->getReply()->getMsg()->append(msg.c_str(),msg.size());
	response->getReply()->setCompletionKey( ++mMsgSeq );
	
}
void MsgHandler :: complexSubscribe(Package* package, SP_Response * response)
{
	MsgExpress::ComplexSubscribeData* complexSub=(MsgExpress::ComplexSubscribeData*)package->getMessage().get();
	if(complexSub==NULL)
		return;
	printf("ComplexSubscribe:%s\r\n",complexSub->ShortDebugString().c_str());

	for(int i=0;i<complexSub->unsub_size();i++)
	{
		mServiceManager->unSubscribe(complexSub->unsub(i).id(),mSid.value);
	}
	for(int i=0;i<complexSub->sub_size();i++)
	{
		mServiceManager->subscribe(complexSub->sub(i),mSid.value);
	}
	MsgExpress::ComplexSubscribeResponse resp;
	resp.set_result(0);
	std::string msg;
	MessageUtil::serializePackageToString(resp,PackageHeader::Response,package->getCommand(),package->getSerialNum(),0,&msg);
	response->getReply()->getMsg()->append(msg.c_str(),msg.size());
	response->getReply()->setCompletionKey( ++mMsgSeq );
}
void MsgHandler :: unsubscribe(Package* package, SP_Response * response)
{
	MsgExpress::UnSubscribeData* unsub=(MsgExpress::UnSubscribeData*)package->getMessage().get();
	if(unsub==NULL)
		return;
	printf("unSubscribe:%s\r\n",unsub->ShortDebugString().c_str());
	int subId=unsub->id();
	mServiceManager->unSubscribe(subId,(unsigned int)mSid.value);

	MsgExpress::UnSubscribeResponse resp;
	resp.set_result(0);
	resp.set_id(subId);
	std::string msg;
	MessageUtil::serializePackageToString(resp,PackageHeader::Response,package->getCommand(),package->getSerialNum(),0,&msg);
	response->getReply()->getMsg()->append(msg.c_str(),msg.size());
	response->getReply()->setCompletionKey( ++mMsgSeq );
	
}
void MsgHandler :: publish(Package* package, SP_Response * response)
{
	MsgExpress::PublishData* pubData=(MsgExpress::PublishData*)package->getMessage().get();
	if(pubData==NULL)
	{
		printf("Error:Publish\r\n");
		return;
	}
	map<int,vector<MsgExpress::SubscribeData*>*> mapUserList;
	mServiceManager->getSubscriberList(pubData,mapUserList);
	map<int,vector<MsgExpress::SubscribeData*>*>::iterator it=mapUserList.begin();
	while(it!=mapUserList.end())
	{
		SP_SidList* toUsers=new SP_SidList();
		toUsers->add(SP_Sid_t(it->first));
		post(response,package->getContent(),package->getSize(),toUsers);
		it++;
	}
	printf("Publish:%s\r\n",pubData->ShortDebugString().c_str());
}
void MsgHandler :: process(Package* package, SP_Response * response)
{
	std::string msgtype=package->getClassType();
	PackageHeader::MsgType packagetype=package->getPackageType();
	if(packagetype==PackageHeader::Request)
	{
		//printf("Request,serial:%d,type:%d,command:%x,app:%d,funcIdx:%d,src:%d,dst:%d,class:%s\r\n",package->getSerialNum(),packagetype,package->getCommand(),package->getApp(),package->getFunctionIdx(),package->getSrcAddr(),package->getDstAddr(),msgtype.c_str());
	
		unsigned int app=package->getApp();
		unsigned int functionIdx=package->getFunctionIdx();
		if(app==0)
		{
			if(functionIdx==MsgExpress::Cmd_RegService)
			{
				regService(package,response);
			}
			else if(functionIdx==MsgExpress::Cmd_Subscribe)
			{
				subscribe(package,response);
			}
			else if(functionIdx==MsgExpress::Cmd_UnSubscribe)
			{
				unsubscribe(package,response);
			}
			else if(functionIdx==MsgExpress::Cmd_ComplexSubscribe)
			{
				complexSubscribe(package,response);
			}
		}
		else
		{
			SP_SidList* providers=new SP_SidList();
			mServiceManager->getServiceProviderList(package->getCommand(),providers);
			if(providers->getCount()>0)
			{
				package->setSrcAddr(mSid.value);
				SP_Sid_t sid(0);
				srand((unsigned int)time(NULL));
				int index=rand() % providers->getCount();
		        sid=providers->get(index);
				post(response,package->getContent(),package->getSize(),sid);
			}
			else //send common response msg
			{
				MsgExpress::Response resp;
				resp.set_retcode(10);
				resp.set_msg("No service available!");
				std::string msg;
				MessageUtil::serializePackageToString(resp,PackageHeader::Response,0,package->getSerialNum(),0,&msg);
				response->getReply()->getMsg()->append(msg.c_str(),msg.size());
				response->getReply()->setCompletionKey( ++mMsgSeq );
				printf("No service available:%x\r\n",package->getCommand());
			}
		}
	}
	if(packagetype==PackageHeader::Publish)
	{
		publish(package,response);
	}
	else if(packagetype==PackageHeader::Response)
	{
		unsigned int dst=package->getDstAddr();
		SP_Sid_t sid(dst);
		post(response,package->getContent(),package->getSize(),sid);
		//printf("Response,serial:%d,type:%d,command:%x,app:%d,funcIdx:%d,src:%d,dst:%d,class:%s\r\n",package->getSerialNum(),packagetype,package->getCommand(),package->getApp(),package->getFunctionIdx(),package->getSrcAddr(),package->getDstAddr(),msgtype.c_str());
	}
}

int MsgHandler :: handle( SP_Request * request, SP_Response * response )
{
	MsgDecoder * decoder = (MsgDecoder*)request->getMsgDecoder();
	SP_BlockingQueue * msgQueue = decoder->getQueue();

	char buffer[ 256 ] = { 0 };
	int ret = 0;

	for( ; NULL != msgQueue->top(); ) {
		void* p=msgQueue->pop();
		Package * package = (Package*)p;
		if(package!=NULL){
		    process(package,response);
		}
		delete p;
	}
	return ret;
}

void MsgHandler :: error( SP_Response * response )
{
	char buffer[ 64 ] = { 0 };
	snprintf( buffer, sizeof( buffer ), "SYS : %d error offline\r\n", mSid.mKey );

	broadcast( response, buffer, strlen(buffer),&mSid );
}

void MsgHandler :: timeout( SP_Response * response )
{
	char buffer[ 64 ] = { 0 };
	snprintf( buffer, sizeof( buffer ), "SYS : %d timeout offline\r\n", mSid.mKey );

	broadcast( response, buffer,strlen(buffer), &mSid );
}

void MsgHandler :: close()
{
	mServiceManager->getUserList()->remove( mSid );
	if(mServiceList.size()>0)
		mServiceManager->unregService(mServiceList,mSid);
	mServiceManager->unSubscribeAll(mSid.value);
}


void MsgCompletionHandler :: completionMessage( SP_Message * msg )
{
#if 0
	printf( "message completed { completion key : %d }\n", msg->getCompletionKey() );

	printf( "\tsuccess {" );
	for( int i = 0; i < msg->getSuccess()->getCount(); i++ ) {
		printf( " %d", msg->getSuccess()->get( i ).mKey );
	}
	printf( "}\n" );

	printf( "\tfailure {" );
	for( int i = 0; i < msg->getFailure()->getCount(); i++ ) {
		printf( " %d", msg->getFailure()->get( i ).mKey );
	}
	printf( "}\n" );
#endif

	delete msg;
}